KAFKA-10317: Global thread should honor shutdown signal during bootstrapping#22417
KAFKA-10317: Global thread should honor shutdown signal during bootstrapping#22417lucliu1108 wants to merge 4 commits into
Conversation
chickenchickenlove
left a comment
There was a problem hiding this comment.
Thanks for your hard work!
I left a comment.
When you get a chance, please take a look 🙇♂️
| if (inErrorStateSupplier.getAsBoolean()) { | ||
| logBootstrapInterrupted(storeMetadata); | ||
| return; |
There was a problem hiding this comment.
Could we make the shutdown-interrupted bootstrap path explicit instead of returning normally from GlobalStateManagerImpl?
Currently, when inErrorStateSupplier.getAsBoolean() is true, restoreState() / reprocessState() just return, so GlobalStateManagerImpl#initialize() can also return as if bootstrap completed successfully. As a result, GlobalStateUpdateTask#initialize() may continue into initTopology(), processorContext.initialize(), and flushState() even though shutdown has already been requested.
Since initTopology() can invoke user-provided Processor#init(), this could unnecessarily open external resources during shutdown. Maybe this should use an explicit internal signal, such as a dedicated bootstrap-interrupted exception caught only on the clean shutdown path, or return an initialize status like completed/interrupted so the follow-up initialization can be skipped.
What do you think?
Summary
This PR introduces a shutdown-aware bootstrap loop in
GlobalStateManagerImpland aconsumer.wakeup()call duringGlobalStreamThread.shutdown()that together letKafkaStreams#close()interrupt global-store restoration in progress, instead of waiting for the entire changelog to be replayed.Ticket: https://issues.apache.org/jira/browse/KAFKA-10317
Implementation
The global thread passes its
inErrorState()predicate to the state manager, which checks it before each batch in the bootstrap poll loop and exits cleanly when shutdown is requested. Thewakeup()call additionally interrupts any in-flightpoll()so shutdown takes effect right away, even if the loop is currently blocked on a fetch. A matching WakeupException catch in the main update loop ensures clean shutdowns aren't reported through the uncaught-exception handler.Tests
Added unit tests in
GlobalStateManagerImplTestcovering the supplier check andWakeupExceptionhandling in bothrestoreStateandreprocessState, and end-to-end tests inGlobalStreamThreadTestfor the close-during-bootstrap scenario.